package com.bieber.server.netty.handler;

import com.bieber.common.Constants;
import com.bieber.server.Transporter;
import com.bieber.server.executor.ChannelTask;
import com.bieber.server.pack.FilePackage;
import io.netty.channel.Channel;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by bieber on 2015/8/19.
 */
public class ExecutorHandler implements TransporterHandler {

    private static final ChannelExecutor[] EXECUTORS = new ChannelExecutor[Runtime.getRuntime().availableProcessors()];

    private static final AtomicInteger childIndex = new AtomicInteger(0);

    private static final ConcurrentHashMap<Channel,ChannelExecutor> CHANNEL_EXECUTOR_MAPPER = new ConcurrentHashMap<Channel, ChannelExecutor>();

    static {
        for(int i=0;i<Runtime.getRuntime().availableProcessors();i++){
            EXECUTORS[i] = new ChannelExecutor("TRANSPORTER-ACCEPTOR-EXECUTOR-"+i);
            EXECUTORS[i].start();
        }
    }

    public ChannelExecutor getNext(){
        return EXECUTORS[Math.abs(childIndex.getAndIncrement() % EXECUTORS.length)];
    }


    @Override
    public void doHandle(Channel channel,FilePackage filePackage) {
        if(!CHANNEL_EXECUTOR_MAPPER.containsKey(channel)){
            CHANNEL_EXECUTOR_MAPPER.putIfAbsent(channel, getNext());
        }
        CHANNEL_EXECUTOR_MAPPER.get(channel).submit(new ChannelTask(channel,filePackage));
    }

    static class ChannelExecutor extends Thread{

        private BlockingQueue<Runnable> tasks= new LinkedBlockingDeque<Runnable>();

        public ChannelExecutor(String name) {
            super(name);
        }

        public void submit(Runnable runnable){
            try {
                tasks.put(runnable);
            } catch (InterruptedException e) {
                Constants.COMMON_LOGGER.warn("ChannelExecutor occur an exception", e);
            }
        }

        @Override
        public void run() {
            while(true){
             /*   if(!Transporter.isRunning()){
                    break;
                }*/
                Runnable runnable = null;
                try {
                    runnable = tasks.take();
                    runnable.run();
                } catch (InterruptedException e) {
                    Constants.COMMON_LOGGER.warn("ChannelExecutor occur an exception",e);
                }
            }
        }
    }
}

